package com.migrate.module.migrate;

import cn.hutool.core.date.DateTime;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.migrate.module.config.ApplicationContextUtil;
import com.migrate.module.constants.Constants;
import com.migrate.module.domain.*;
import com.migrate.module.enumeration.BinlogType;
import com.migrate.module.enumeration.DBChannel;
import com.migrate.module.enumeration.EtlProgressStatus;
import com.migrate.module.mapper.migrate.MigrateScrollMapper;
import com.migrate.module.service.MigrateService;
import com.migrate.module.util.DateUtils;
import com.migrate.module.util.MigrateCheckUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;

import java.math.BigDecimal;
import java.util.*;
import java.util.stream.Collectors;

/**
 * 数据核对处理
 *
 * @author zhonghuashishan
 */
@Slf4j
public class CheckDataProcessor {

    private static volatile CheckDataProcessor checkDataProcessor;

    private CheckDataProcessor(){

    }

    /**
     * 核验数据
     * @param rangeScroll 要检查的数据抽取模型
     */
    public void checkData(RangeScroll rangeScroll){
            EtlProgress etlProgress = addEtlProgress(rangeScroll);
        try {
            //1. 先获取老库的一批数据
            List<Map<String, Object>> sourceList = querySourceList(rangeScroll);
            //2. 再获取新库的一批数据
            List<Map<String, Object>> targetList = queryTargetList(sourceList, rangeScroll);
            //3. 对数据进行核对校验
            Map<BinlogType, List<Map<String, Object>>> comparisonMap = comparison(sourceList,targetList,rangeScroll);
            //4. 对数据进行归正处理
            updateComparisonData(comparisonMap,rangeScroll);
            //5. 完成数据核对校验，更改状态
            updateEtlDirtyRecord(etlProgress, EtlProgressStatus.CHECK_SUCCESS.getValue(),rangeScroll, null);
        }catch (Exception e){
            //数据核对过程失败，只记录数据核对错误信息
            updateEtlDirtyRecord(etlProgress, EtlProgressStatus.SUCCESS.getValue(),rangeScroll, e.getMessage());
            log.error("数据核对过程中发生异常 {"+e.getMessage()+"}", etlProgress);
        }
    }

    /**
     * 获取到可以核对校验的数据批次集合
     * @return 数据批次集合
     */
    public List<RangeScroll> queryCheckDataList(){
        MigrateScrollMapper migrateScrollMapper = ApplicationContextUtil.getBean(MigrateScrollMapper.class);
        // 查询已同步的数据
        EtlDirtyRecord etlDirtyRecord = new EtlDirtyRecord();
        etlDirtyRecord.setStatus(EtlProgressStatus.SUCCESS.getValue());
        // 默认一次最多查询100条数据
        List<EtlDirtyRecord> progressList = migrateScrollMapper.queryDirtyRecordList(etlDirtyRecord);
        List<RangeScroll> rangeScrollList = new ArrayList<>();
        if (CollectionUtils.isNotEmpty(progressList)){
            for (EtlDirtyRecord dirtyRecord:progressList){
                RangeScroll rangeScroll = new RangeScroll();
                rangeScroll.setId(dirtyRecord.getId());
                rangeScroll.setTicket(dirtyRecord.getTicket());
                rangeScroll.setCurTicketStage(dirtyRecord.getCurTicketStage());
                rangeScroll.setTableName(dirtyRecord.getLogicModel());
                rangeScroll.setStartScrollId(dirtyRecord.getRecordValue());
                rangeScroll.setPageSize(dirtyRecord.getSyncSize());
                rangeScroll.setEndTime(dirtyRecord.getScrollEndTime());
                rangeScroll.setStartTime(dirtyRecord.getScrollTime());
                rangeScrollList.add(rangeScroll);
            }
        }
        return rangeScrollList;
    }

    /**
     * 构建一个单例模式对象
     * @return CheckDataProcessor实例
     */
    public static CheckDataProcessor getInstance(){
        if (null == checkDataProcessor){
            synchronized (CheckDataProcessor.class){
                if (null == checkDataProcessor){
                    checkDataProcessor = new CheckDataProcessor();
                }
            }
        }
        return checkDataProcessor;
    }
    /**
     * 核对数据不一致的进行修复
     * @param comparisonMap 要比对的数据
     * @param rangeScroll 数据抽取模型
     */
    private void updateComparisonData(Map<BinlogType, List<Map<String, Object>>> comparisonMap,RangeScroll rangeScroll) {
        MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class);

        List<BinLog> updatedList = new ArrayList<>();
        for (Map.Entry<BinlogType, List<Map<String, Object>>> comparisonEntry:comparisonMap.entrySet()){
            //分别处理需要更新和新增的数据
            for (Map<String, Object> updatedMap : comparisonEntry.getValue()) {
                BinLog binLog = new BinLog();
                binLog.setOperateType(comparisonEntry.getKey().getValue());
                binLog.setTableName(rangeScroll.getTableName());
                binLog.setDataMap(updatedMap);
                updatedList.add(binLog);
            }
        }
        migrateService.migrateBat(rangeScroll.getTableName(), updatedList);
    }

    /**
     * 对数据进行核对比较，并返回核对不一致的数据
     * @param sourceList 源List
     * @param targetList 被比较的List
     * @return 异常数据Map
     */
    private Map<BinlogType, List<Map<String, Object>>> comparison(List<Map<String, Object>> sourceList, List<Map<String, Object>> targetList,RangeScroll rangeScroll) {
        String singleKey = MergeConfig.getSingleKey(rangeScroll.getTableName());
        // 核验数据，主要对比的老库的数据是否和新库不一致，以老库为准,先转换新库的数据对象为MAP，key以唯一标识字段的值为准
        Map<String, Map<String, Object>> targetMap = targetList.stream().collect(Collectors.toMap(
                s -> s.get(singleKey)+"", s -> s
        ));
        Map<BinlogType, List<Map<String, Object>>> errorMap = new HashMap<BinlogType, List<Map<String, Object>>>();
        // 遍历源数据集合信息，验证数据是否匹配
        for (Map<String, Object> sourceMap :sourceList){
            boolean isComparison = false;
            Object scrollKeyObj = sourceMap.get(singleKey);
            if (scrollKeyObj instanceof String)
            {
                String scrollKey = (String) scrollKeyObj;
                // 匹配对应的数据
                Map<String, Object> scrollMap = targetMap.get(scrollKey);
                if (null != scrollMap){
                    isComparison = MigrateCheckUtil.comparison(sourceMap, scrollMap, rangeScroll.getTableName());
                    // 数据未匹配，入异常记录
                    if (isComparison){
                        //添加需要更新的数据
                        if (errorMap.get(BinlogType.UPDATE) != null) {
                            errorMap.get(BinlogType.UPDATE).add(sourceMap);
                        } else {
                            List<Map<String, Object>> updatedList = new ArrayList<Map<String, Object>>();
                            updatedList.add(sourceMap);
                            errorMap.put(BinlogType.UPDATE, updatedList);
                        }
                    }
                } else {
                    //添加需要新增的数据
                    if (errorMap.get(BinlogType.INSERT) != null) {
                        errorMap.get(BinlogType.INSERT).add(sourceMap);
                    } else {
                        List<Map<String, Object>> insertedList = new ArrayList<Map<String, Object>>();
                        insertedList.add(sourceMap);
                        errorMap.put(BinlogType.INSERT, insertedList);
                    }
                }
            }
        }
        return errorMap;
    }

    /**
     * 根据源数据查询的信息，返回新库的数据信息
     * @param sourceList 新库的数据信息
     */
    private List<Map<String, Object>> queryTargetList(List<Map<String, Object>> sourceList,RangeScroll rangeScroll) {
        MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class);
        List<String> queryList = new ArrayList<>();
        String singleKey = MergeConfig.getSingleKey(rangeScroll.getTableName());
        // 按指定的字段名称，获取批量的具体值作为查询条件
        for (Map<String, Object> map:sourceList){
            queryList.add(map.get(singleKey)+"");
        }
        // 按组装的集合，查询新库数据对象返回
        return migrateService.findByIdentifiers(rangeScroll.getTableName(), queryList, DBChannel.CHANNEL_2.getValue());
    }

    /**
     *  从源数据，拉取全量同步的某一个批次数据
     * @param rangeScroll 数据抽取模型
     * @return 指定批次数据
     */
    private List<Map<String,Object>> querySourceList(RangeScroll rangeScroll){
        MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class);
        // 拉取某一个批次的源数据信息
        return  migrateService.queryInfoList(rangeScroll);
    }

    /**
     * 初始化记录本次迁移数据
     * @param rangeScroll 数据抽取模型
     * @return 记录本次迁移数据
     */
    private EtlProgress addEtlProgress(RangeScroll rangeScroll){
        MigrateScrollMapper migrateScrollMapper = ApplicationContextUtil.getBean(MigrateScrollMapper.class);
        // 查询是否已经有核对的数据了
        EtlProgress etlProgress = migrateScrollMapper.queryEtlProgressCheck(rangeScroll);
        if (null != etlProgress){
            return etlProgress;
        }
        etlProgress = new EtlProgress();
        etlProgress.setLogicModel(rangeScroll.getTableName());
        etlProgress.setTicket(rangeScroll.getTicket());
        etlProgress.setCurTicketStage(rangeScroll.getCurTicketStage());
        etlProgress.setRetryTimes(0);
        etlProgress.setFinishRecord(0);
        etlProgress.setProgressType(1);
        etlProgress.setStatus(EtlProgressStatus.INIT.getValue());
        //初始化查询的字段值，不存在数据库字段默认设置为1
        if (!ObjectUtils.isEmpty(rangeScroll.getStartScrollId())){
            etlProgress.setScrollId(rangeScroll.getStartScrollId());
        }
        etlProgress.setScrollTime(rangeScroll.getStartTime());
        etlProgress.setScrollEndTime(rangeScroll.getEndTime());
        etlProgress.setCreateTime(new DateTime());
        etlProgress.setUpdateTime(new DateTime());
        migrateScrollMapper.insertEtlProgress(etlProgress);

        return etlProgress;
    }

    /**
     * 更新数据迁移的进度明细信息
     * @param rangeScroll 要检查的数据抽取模型
     * @param status 状态
     * @param errorMsg 异常信息
     */
    private EtlDirtyRecord updateEtlDirtyRecord(RangeScroll rangeScroll, Integer status, String errorMsg){
        MigrateScrollMapper migrateScrollMapper = ApplicationContextUtil.getBean(MigrateScrollMapper.class);

        EtlDirtyRecord etlDirtyRecord = new EtlDirtyRecord();
        etlDirtyRecord.setLogicModel(rangeScroll.getTableName());
        etlDirtyRecord.setTicket(rangeScroll.getTicket());
        etlDirtyRecord.setCurTicketStage(rangeScroll.getCurTicketStage());
        etlDirtyRecord.setStatus(status);
        etlDirtyRecord.setErrorMsg(errorMsg);
        etlDirtyRecord.setUpdateTime(new DateTime());
        migrateScrollMapper.updateEtlDirtyRecord(etlDirtyRecord);
        return etlDirtyRecord;
    }

    /**
     *  更新 本次迁移数据的状态
     * @param etlProgress 迁移记录
     * @param status 迁移状态
     * @param rangeScroll 要检查的数据抽取模型
     * @param errorMsg 异常信息
     */
    private void updateEtlDirtyRecord(EtlProgress etlProgress,Integer status,RangeScroll rangeScroll, String errorMsg){
        //核对校验成功情况下，更新迁移记录
        if (errorMsg == null) {
            MigrateScrollMapper migrateScrollMapper = ApplicationContextUtil.getBean(MigrateScrollMapper.class);
            etlProgress.setUpdateTime(new DateTime());
            etlProgress.setCurTicketStage(rangeScroll.getCurTicketStage());
            etlProgress.setFinishRecord(etlProgress.getFinishRecord()+rangeScroll.getPageSize());
            etlProgress.setProgressType(1);
            // 更新本批次对应的迁移数据核对数据量
            migrateScrollMapper.updateEtlProgress(etlProgress);
        }
        // 更新迁移核对的明细
        updateEtlDirtyRecord(rangeScroll, status, errorMsg);
    }

    /**
     * 更新本次核对校验对应的迁移记录为核对校验完成
     * @param queryEtlProgressMap 查询迁移记录的集合
     */
    public void updateEtlProgressCheckSuccess(Map<String, RangeScroll> queryEtlProgressMap){
        MigrateScrollMapper migrateScrollMapper = ApplicationContextUtil.getBean(MigrateScrollMapper.class);

        Collection<RangeScroll> rangeScrolls = queryEtlProgressMap.values();
        for (RangeScroll rangeScroll : rangeScrolls) {
            EtlProgress etlProgress = migrateScrollMapper.queryEtlProgressCheck(rangeScroll);

            //判断是否所有批次都数据校验完成
            if (checkEtlProgressCheckSuccess(etlProgress, migrateScrollMapper)) {
                etlProgress.setUpdateTime(new DateTime());
                etlProgress.setStatus(EtlProgressStatus.CHECK_SUCCESS.getValue());
                migrateScrollMapper.updateEtlProgress(etlProgress);
            }
        }
    }

    /**
     * 判断迁移记录是否已经核对校验完成
     * @param etlProgress 迁移记录
     * @param migrateScrollMapper 滚动迁移的Mapper组件
     * @return 是否核对校验完成
     */
    public boolean checkEtlProgressCheckSuccess(EtlProgress etlProgress, MigrateScrollMapper migrateScrollMapper){
        if (etlProgress == null) {
            return false;
        }

        EtlStatistical etlStatistical = new EtlStatistical();
        etlStatistical.setLogicModel(etlProgress.getLogicModel());
        etlStatistical.setStartTime(DateUtils.format(etlProgress.getScrollTime()) );
        etlStatistical.setEndTime(DateUtils.format(etlProgress.getScrollEndTime()));

        BigDecimal statisticalCount = migrateScrollMapper.getStatisticalCount(etlStatistical);
        if (new BigDecimal(etlProgress.getFinishRecord().toString()).compareTo(statisticalCount) >= 0) {
            return true;
        }

        return false;
    }

}
